/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.core;



import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.annotations.Experimental.Kind;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.CoderRegistry;

import com.bff.gaia.unified.sdk.state.BagState;

import com.bff.gaia.unified.sdk.state.CombiningState;

import com.bff.gaia.unified.sdk.state.MapState;

import com.bff.gaia.unified.sdk.state.SetState;

import com.bff.gaia.unified.sdk.state.State;

import com.bff.gaia.unified.sdk.state.StateBinder;

import com.bff.gaia.unified.sdk.state.StateSpec;

import com.bff.gaia.unified.sdk.state.StateSpecs;

import com.bff.gaia.unified.sdk.state.ValueState;

import com.bff.gaia.unified.sdk.state.WatermarkHoldState;

import com.bff.gaia.unified.sdk.transforms.Combine.CombineFn;

import com.bff.gaia.unified.sdk.transforms.CombineWithContext.CombineFnWithContext;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.TimestampCombiner;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Equivalence;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;



import java.io.IOException;

import java.io.Serializable;

import java.util.Objects;



/** Static utility methods for creating {@link StateTag} instances. */

@Experimental(Kind.STATE)

public class StateTags {



  private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault();



  public static final Equivalence<StateTag> ID_EQUIVALENCE =

      new Equivalence<StateTag>() {

        @Override

        protected boolean doEquivalent(StateTag a, StateTag b) {

          return a.getId().equals(b.getId());

        }



        @Override

        protected int doHash(StateTag stateTag) {

          return stateTag.getId().hashCode();

        }

      };



  /** @deprecated for migration purposes only */

  @Deprecated

  private static StateBinder adaptTagBinder(final StateTag.StateBinder binder) {

    return new StateBinder() {

      @Override

      public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {

        return binder.bindValue(tagForSpec(id, spec), coder);

      }



      @Override

      public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {

        return binder.bindBag(tagForSpec(id, spec), elemCoder);

      }



      @Override

      public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {

        return binder.bindSet(tagForSpec(id, spec), elemCoder);

      }



      @Override

      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(

          String id,

          StateSpec<MapState<KeyT, ValueT>> spec,

          Coder<KeyT> mapKeyCoder,

          Coder<ValueT> mapValueCoder) {

        return binder.bindMap(tagForSpec(id, spec), mapKeyCoder, mapValueCoder);

      }



      @Override

      public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining(

          String id,

          StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,

          Coder<AccumT> accumCoder,

          CombineFn<InputT, AccumT, OutputT> combineFn) {

        return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn);

      }



      @Override

      public <InputT, AccumT, OutputT>

          CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(

              String id,

              StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,

              Coder<AccumT> accumCoder,

              CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {

        return binder.bindCombiningValueWithContext(tagForSpec(id, spec), accumCoder, combineFn);

      }



      @Override

      public WatermarkHoldState bindWatermark(

          String id, StateSpec<WatermarkHoldState> spec, TimestampCombiner timestampCombiner) {

        return binder.bindWatermark(tagForSpec(id, spec), timestampCombiner);

      }

    };

  }



  private enum StateKind {

    SYSTEM('s'),

    USER('u');



    private final char prefix;



    StateKind(char prefix) {

      this.prefix = prefix;

    }

  }



  private StateTags() {}



  private interface SystemStateTag<StateT extends State> {

    StateTag<StateT> asKind(StateKind kind);

  }



  /** Create a state tag for the given id and spec. */

  public static <StateT extends State> StateTag<StateT> tagForSpec(

      String id, StateSpec<StateT> spec) {

    return new SimpleStateTag<>(new StructuredId(id), spec);

  }



  /** Create a simple state tag for values of type {@code T}. */

  public static <T> StateTag<ValueState<T>> value(String id, Coder<T> valueCoder) {

    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder));

  }



  /**

   * Create a state tag for values that use a {@link CombineFn} to automatically merge multiple

   * {@code InputT}s into a single {@code OutputT}.

   */

  public static <InputT, AccumT, OutputT>

      StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValue(

          String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {

    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));

  }



  /**

   * Create a state tag for values that use a {@link CombineFnWithContext} to automatically merge

   * multiple {@code InputT}s into a single {@code OutputT}.

   */

  public static <InputT, AccumT, OutputT>

      StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValueWithContext(

          String id,

          Coder<AccumT> accumCoder,

          CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {

    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.combining(accumCoder, combineFn));

  }



  /**

   * Create a state tag for values that use a {@link CombineFn} to automatically merge multiple

   * {@code InputT}s into a single {@code OutputT}.

   *

   * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should

   * only be used to initialize static values.

   */

  public static <InputT, AccumT, OutputT>

      StateTag<CombiningState<InputT, AccumT, OutputT>> combiningValueFromInputInternal(

          String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) {

    return new SimpleStateTag<>(

        new StructuredId(id), StateSpecs.combiningFromInputInternal(inputCoder, combineFn));

  }



  /**

   * Create a state tag that is optimized for adding values frequently, and occasionally retrieving

   * all the values that have been added.

   */

  public static <T> StateTag<BagState<T>> bag(String id, Coder<T> elemCoder) {

    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder));

  }



  /** Create a state spec that supporting for {@link java.util.Set} like access patterns. */

  public static <T> StateTag<SetState<T>> set(String id, Coder<T> elemCoder) {

    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.set(elemCoder));

  }



  /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */

  public static <K, V> StateTag<MapState<K, V>> map(

      String id, Coder<K> keyCoder, Coder<V> valueCoder) {

    return new SimpleStateTag<>(new StructuredId(id), StateSpecs.map(keyCoder, valueCoder));

  }



  /** Create a state tag for holding the watermark. */

  public static <W extends BoundedWindow> StateTag<WatermarkHoldState> watermarkStateInternal(

      String id, TimestampCombiner timestampCombiner) {

    return new SimpleStateTag<>(

        new StructuredId(id), StateSpecs.watermarkStateInternal(timestampCombiner));

  }



  /**

   * Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to

   * collide with any user tags.

   */

  public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(

      StateTag<StateT> tag) {

    if (!(tag instanceof SystemStateTag)) {

      throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag);

    }

    // Checked above

    @SuppressWarnings("unchecked")

    SystemStateTag<StateT> typedTag = (SystemStateTag<StateT>) tag;

    return typedTag.asKind(StateKind.SYSTEM);

  }



  public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>> convertToBagTagInternal(

      StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {

    return new SimpleStateTag<>(

        new StructuredId(combiningTag.getId()),

        StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));

  }



  private static class StructuredId implements Serializable {

    private final StateKind kind;

    private final String rawId;



    private StructuredId(String rawId) {

      this(StateKind.USER, rawId);

    }



    private StructuredId(StateKind kind, String rawId) {

      this.kind = kind;

      this.rawId = rawId;

    }



    public StructuredId asKind(StateKind kind) {

      return new StructuredId(kind, rawId);

    }



    public void appendTo(Appendable sb) throws IOException {

      sb.append(kind.prefix).append(rawId);

    }



    public String getRawId() {

      return rawId;

    }



    @Override

    public String toString() {

      return MoreObjects.toStringHelper(getClass()).add("id", rawId).add("kind", kind).toString();

    }



    @Override

    public boolean equals(Object obj) {

      if (obj == this) {

        return true;

      }



      if (!(obj instanceof StructuredId)) {

        return false;

      }



      StructuredId that = (StructuredId) obj;

      return Objects.equals(this.kind, that.kind) && Objects.equals(this.rawId, that.rawId);

    }



    @Override

    public int hashCode() {

      return Objects.hash(kind, rawId);

    }

  }



  /** A basic {@link StateTag} implementation that manages the structured ids. */

  private static class SimpleStateTag<StateT extends State>

      implements StateTag<StateT>, SystemStateTag<StateT> {



    private final StateSpec<StateT> spec;

    private final StructuredId id;



    public SimpleStateTag(StructuredId id, StateSpec<StateT> spec) {

      this.id = id;

      this.spec = spec;

    }



    /** @deprecated use {@link StateSpec#bind} method via {@link #getSpec} for now. */

    @Override

    @Deprecated

    public StateT bind(StateTag.StateBinder binder) {

      return spec.bind(this.id.getRawId(), adaptTagBinder(binder));

    }



    @Override

    public String getId() {

      return id.getRawId();

    }



    @Override

    public StateSpec<StateT> getSpec() {

      return spec;

    }



    @Override

    public String toString() {

      return MoreObjects.toStringHelper(getClass()).add("id", id).toString();

    }



    @Override

    public void appendTo(Appendable sb) throws IOException {

      id.appendTo(sb);

    }



    @Override

    public StateTag<StateT> asKind(StateKind kind) {

      return new SimpleStateTag<>(id.asKind(kind), spec);

    }



    @Override

    public boolean equals(Object other) {

      if (!(other instanceof SimpleStateTag)) {

        return false;

      }



      SimpleStateTag<?> otherTag = (SimpleStateTag<?>) other;

      return Objects.equals(this.getId(), otherTag.getId())

          && Objects.equals(this.getSpec(), otherTag.getSpec());

    }



    @Override

    public int hashCode() {

      return Objects.hash(getClass(), this.getId(), this.getSpec());

    }

  }

}